package org.databandtech.flinkstreaming;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.databandtech.flinkstreaming.Entity.ItemCount;

public class KafkaWindowFunction implements WindowFunction<
												Long,     // input type
												ItemCount,  // output type
												String,     // key type
												TimeWindow> {

	private static final long serialVersionUID = -1205575505076577764L;
	private String type;

	public KafkaWindowFunction(String type) {
		this.type = type;
	}

	@Override
	public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<ItemCount> out) throws Exception {
		String sys = key.split("-")[0];
		String action = key.split("-")[1];
		Long windowEnd = window.getEnd();
		Long count = input.iterator().next(); // 即迭代器的最后一个long值
		out.collect(new ItemCount(type,sys,action,windowEnd,count));	
	}
}
